fetcher: Drop the libsoup queue
authorColin Walters <walters@verbum.org>
Thu, 19 Jan 2017 10:34:20 +0000 (05:34 -0500)
committerAtomic Bot <atomic-devel@projectatomic.io>
Tue, 7 Feb 2017 19:59:40 +0000 (19:59 +0000)
Now that we have queuing in the higher level pull logic, we don't
need to do this anymore.

It's tempting to keep it since the code diff is so small (without
completely rewriting things), but dropping it here will make
it easier to see when things go wrong at a higher level.

Note that I kept an assertion.

Closes: #654
Approved by: jlebon

src/libostree/ostree-fetcher.c

index a178abfe756c283556c9323ff8cc0690384810d5..bc6c14c9ec5db5dddff8f44afb8a5170046eb883 100644 (file)
@@ -62,8 +62,7 @@ typedef struct {
   GVariant *extra_headers;
   int max_outstanding;
 
-  /* Queue for libsoup, see bgo#708591 */
-  GQueue pending_queue;
+  /* Our active HTTP requests */
   GHashTable *outstanding;
 
   /* Shared across threads; be sure to lock. */
@@ -77,9 +76,6 @@ typedef struct {
 
 } ThreadClosure;
 
-static void
-session_thread_process_pending_queue (ThreadClosure *thread_closure);
-
 typedef struct {
   volatile int ref_count;
 
@@ -187,18 +183,6 @@ idle_closure_free (IdleClosure *idle_closure)
   g_slice_free (IdleClosure, idle_closure);
 }
 
-static int
-pending_task_compare (gconstpointer a,
-                      gconstpointer b,
-                      gpointer unused)
-{
-  gint priority_a = g_task_get_priority (G_TASK (a));
-  gint priority_b = g_task_get_priority (G_TASK (b));
-
-  return (priority_a == priority_b) ? 0 :
-         (priority_a < priority_b) ? -1 : 1;
-}
-
 static OstreeFetcherPendingURI *
 pending_uri_ref (OstreeFetcherPendingURI *pending)
 {
@@ -403,30 +387,23 @@ static void
 on_request_sent (GObject        *object, GAsyncResult   *result, gpointer        user_data);
 
 static void
-session_thread_process_pending_queue (ThreadClosure *thread_closure)
+start_pending_request (ThreadClosure *thread_closure,
+                       GTask         *task)
 {
 
-  while (g_queue_peek_head (&thread_closure->pending_queue) != NULL &&
-         g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding)
-    {
-      GTask *task;
-      OstreeFetcherPendingURI *pending;
-      GCancellable *cancellable;
-
-      task = g_queue_pop_head (&thread_closure->pending_queue);
-
-      pending = g_task_get_task_data (task);
-      cancellable = g_task_get_cancellable (task);
+  OstreeFetcherPendingURI *pending;
+  GCancellable *cancellable;
 
-      g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
+  g_assert_cmpint (g_hash_table_size (thread_closure->outstanding), <, thread_closure->max_outstanding);
 
-      soup_request_send_async (pending->request,
-                               cancellable,
-                               on_request_sent,
-                               g_object_ref (task));
+  pending = g_task_get_task_data (task);
+  cancellable = g_task_get_cancellable (task);
 
-      g_object_unref (task);
-    }
+  g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
+  soup_request_send_async (pending->request,
+                           cancellable,
+                           on_request_sent,
+                           g_object_ref (task));
 }
 
 static void
@@ -547,10 +524,7 @@ session_thread_request_uri (ThreadClosure *thread_closure,
       pending->out_tmpfile = tmpfile;
       tmpfile = NULL; /* Transfer ownership */
 
-      g_queue_insert_sorted (&thread_closure->pending_queue,
-                             g_object_ref (task),
-                             pending_task_compare, NULL);
-      session_thread_process_pending_queue (thread_closure);
+      start_pending_request (thread_closure, task);
     }
 }
 
@@ -600,8 +574,6 @@ ostree_fetcher_session_thread (gpointer data)
    * unreference all data related to the SoupSession ourself to ensure
    * it's freed in the same thread where it was created. */
   g_clear_pointer (&closure->outstanding, g_hash_table_unref);
-  while (!g_queue_is_empty (&closure->pending_queue))
-    g_object_unref (g_queue_pop_head (&closure->pending_queue));
   g_clear_pointer (&closure->session, g_object_unref);
 
   thread_closure_unref (closure);
@@ -903,11 +875,6 @@ finish_stream (OstreeFetcherPendingURI *pending,
 
   pending->state = OSTREE_FETCHER_STATE_COMPLETE;
 
-  /* Now that we've finished downloading, continue with other queued
-   * requests.
-   */
-  session_thread_process_pending_queue (pending->thread_closure);
-
   if (!pending->is_membuf)
     {
       if (stbuf.st_size < pending->content_length)
@@ -935,14 +902,13 @@ on_stream_read (GObject        *object,
                 gpointer        user_data);
 
 static void
-remove_pending_rerun_queue (OstreeFetcherPendingURI *pending)
+remove_pending (OstreeFetcherPendingURI *pending)
 {
   /* Hold a temporary ref to ensure the reference to
    * pending->thread_closure is valid.
    */
   pending_uri_ref (pending);
   g_hash_table_remove (pending->thread_closure->outstanding, pending);
-  session_thread_process_pending_queue (pending->thread_closure);
   pending_uri_unref (pending);
 }
 
@@ -976,7 +942,7 @@ on_out_splice_complete (GObject        *object,
   if (local_error)
     {
       g_task_return_error (task, local_error);
-      remove_pending_rerun_queue (pending);
+      remove_pending (pending);
     }
 
   g_object_unref (task);
@@ -1018,7 +984,7 @@ on_stream_read (GObject        *object,
                                  g_strdup (pending->out_tmpfile),
                                  (GDestroyNotify) g_free);
         }
-      remove_pending_rerun_queue (pending);
+      remove_pending (pending);
     }
   else
     {
@@ -1057,7 +1023,7 @@ on_stream_read (GObject        *object,
   if (local_error)
     {
       g_task_return_error (task, local_error);
-      remove_pending_rerun_queue (pending);
+      remove_pending (pending);
     }
 
   g_object_unref (task);
@@ -1096,7 +1062,7 @@ on_request_sent (GObject        *object,
           g_task_return_pointer (task,
                                  g_strdup (pending->out_tmpfile),
                                  (GDestroyNotify) g_free);
-          remove_pending_rerun_queue (pending);
+          remove_pending (pending);
           goto out;
         }
       else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
@@ -1110,10 +1076,8 @@ on_request_sent (GObject        *object,
                 goto out;
 
               (void) g_input_stream_close (pending->request_body, NULL, NULL);
-              g_queue_insert_sorted (&pending->thread_closure->pending_queue,
-                                     g_object_ref (task), pending_task_compare,
-                                     NULL);
-              remove_pending_rerun_queue (pending);
+
+              start_pending_request (pending->thread_closure, task);
             }
           else
             {
@@ -1204,7 +1168,7 @@ on_request_sent (GObject        *object,
       if (pending->request_body)
         (void) g_input_stream_close (pending->request_body, NULL, NULL);
       g_task_return_error (task, local_error);
-      remove_pending_rerun_queue (pending);
+      remove_pending (pending);
     }
 
   g_object_unref (task);